# 示例13 - 集群化的Quartz
说明
此作业演示如何在集群环境中使用Quartz,以及Quartz如何使用数据库持久化调度信息。
# SimpleRecoveryJob.java源码
package org.quartz.examples.example13;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
/**
* Job的一个缺陷实现,用于单元测试
* 作者:James House
*/
public class SimpleRecoveryJob implements Job {
private static Logger _log = LoggerFactory.getLogger(SimpleRecoveryJob.class);
private static final String COUNT = "count";
/**
* Quartz需要一个公共的空参构造函数,以便调度器可以在需要时实例化类
*/
public SimpleRecoveryJob() {
}
/**
* 当与此作业相关联的触发器触发时,由调度器调用此方法
* Throws:作业执行异常(JobExecutionException)-当执行作业时产生异常
*/
public void execute(JobExecutionContext context) throws JobExecutionException {
JobKey jobKey = context.getJobDetail().getKey();
//如果作业正在恢复,则打印消息
if (context.isRecovering()) {
_log.info("SimpleRecoveryJob: " + jobKey + " RECOVERING at " + new Date());
} else {
_log.info("SimpleRecoveryJob: " + jobKey + " starting at " + new Date());
}
//延迟10秒
long delay = 10L * 1000L;
try {
Thread.sleep(delay);
} catch (Exception e) {
//
}
JobDataMap data = context.getJobDetail().getJobDataMap();
int count;
if (data.containsKey(COUNT)) {
count = data.getInt(COUNT);
} else {
count = 0;
}
count++;
data.put(COUNT, count);
_log.info("SimpleRecoveryJob: " + jobKey + " done at " + new Date() + "\n Execution #" + count);
}
}
# SimpleRecoveryStatefulJob.java源码
package org.quartz.examples.example13;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.PersistJobDataAfterExecution;
/**
* 此作业具有与SimpleRecoveryJob相同的功能,只是此作业实现的是“有状态”的,因为它将在每次执行后自动重新持久化其数据
*(JobDataMap),并且一次只能执行JobDetail的一个实例
* 作者:Bill Kratzer
*/
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class SimpleRecoveryStatefulJob extends SimpleRecoveryJob {
public SimpleRecoveryStatefulJob() {
super();
}
}
# ClusterExample.java源码
package org.quartz.examples.example13;
import static org.quartz.DateBuilder.futureDate;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
import org.quartz.DateBuilder.IntervalUnit;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleTrigger;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 用于测试/显示JDBCJobStore(JobStoreTX或JobStoreCMT)的群集功能
* 所有实例都必须使用不同的属性文件,因为它们的实例ID必须不同,但所有其他属性都应该相同
* 如果您希望它清除现有的作业和触发器,请传递一个名为“clearJobs”的命令行参数
* 您可能应该从一组“新的”表开始(假设您可能有来自其他测试的数据遗留在其中),因为将非集群设置的数据与集群设置的混合可能会很糟糕
* 尝试在运行时杀死其中一个群集实例,并查看其余实例是否恢复正在进行的作业。注意,在默认设置下,检测故障可能需要15秒左右的时间
* 也可以尝试在调度程序中注册/不注册关机挂钩插件的情况下运行它。(org.quartz.plugins.management.ShutdownHookPlugin)
* 注意:不要在单独的机器上运行集群,除非它们的时钟使用某种形式的时间同步服务(如NTP守护程序)进行同步
*
* 参考:SimpleRecoveryJob
* 参考:SimpleRecoveryStatefulJob
* 作者:James House
*/
public class ClusterExample {
private static Logger _log = LoggerFactory.getLogger(ClusterExample.class);
public void run(boolean inClearJobs, boolean inScheduleJobs) throws Exception {
//首先,我们必须获得对调度器的引用
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
if (inClearJobs) {
_log.warn("***** Deleting existing jobs/triggers *****");
sched.clear();
}
_log.info("------- Initialization Complete -----------");
if (inScheduleJobs) {
_log.info("------- Scheduling Jobs ------------------");
String schedId = sched.getSchedulerInstanceId();
int count = 1;
//将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
//在调度程序停止时重新执行此作业的位置
JobDetail job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
.requestRecovery()
.build();
SimpleTrigger trigger = newTrigger().withIdentity("triger_" + count, schedId)
.startAt(futureDate(1, IntervalUnit.SECOND))
.withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();
_log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
+ trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
sched.scheduleJob(job, trigger);
count++;
//将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
//在调度程序停止时重新执行此作业的位置
job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
.requestRecovery()
.build();
trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(2, IntervalUnit.SECOND))
.withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();
_log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
+ trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
sched.scheduleJob(job, trigger);
count++;
//将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
//在调度程序停止时重新执行此作业的位置
job = newJob(SimpleRecoveryStatefulJob.class).withIdentity("job_" + count, schedId)
.requestRecovery()
.build();
trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
.withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(3)).build();
_log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
+ trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
sched.scheduleJob(job, trigger);
count++;
//将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
//在调度程序停止时重新执行此作业的位置
job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
.requestRecovery()
.build();
trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
.withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(4)).build();
_log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " & repeat: " + trigger.getRepeatCount()
+ "/" + trigger.getRepeatInterval());
sched.scheduleJob(job, trigger);
count++;
//将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
//在调度程序停止时重新执行此作业的位置
job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
.requestRecovery()
.build();
trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
.withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInMilliseconds(4500L)).build();
_log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " & repeat: " + trigger.getRepeatCount()
+ "/" + trigger.getRepeatInterval());
sched.scheduleJob(job, trigger);
}
//在调用start()之前,作业不会启动
_log.info("------- Starting Scheduler ---------------");
sched.start();
_log.info("------- Started Scheduler ----------------");
_log.info("------- Waiting for one hour... ----------");
try {
Thread.sleep(3600L * 1000L);
} catch (Exception e) {
//
}
_log.info("------- Shutting Down --------------------");
sched.shutdown();
_log.info("------- Shutdown Complete ----------------");
}
public static void main(String[] args) throws Exception {
boolean clearJobs = false;
boolean scheduleJobs = true;
for (String arg : args) {
if (arg.equalsIgnoreCase("clearJobs")) {
clearJobs = true;
} else if (arg.equalsIgnoreCase("dontScheduleJobs")) {
scheduleJobs = false;
}
}
ClusterExample example = new ClusterExample();
example.run(clearJobs, scheduleJobs);
}
}
# 集群配置
此示例需要数据库支持。示例使用 PostgreSQL 作为数据库,并通过 JDBC JobStore 实现集群。
# instance1.properties
实例1的配置文件,实例ID为 instance_one:
#============================================================================
# 配置主调度器属性
#============================================================================
org.quartz.scheduler.instanceName: TestScheduler
org.quartz.scheduler.instanceId: instance_one
#============================================================================
# 配置线程池
#============================================================================
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 5
org.quartz.threadPool.threadPriority: 5
#============================================================================
# 配置JobStore
#============================================================================
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.dataSource=myDS
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true
#============================================================================
# 配置数据源
#============================================================================
org.quartz.dataSource.myDS.driver: org.postgresql.Driver
org.quartz.dataSource.myDS.URL: jdbc:postgresql://localhost:5432/quartz
org.quartz.dataSource.myDS.user: quartz
org.quartz.dataSource.myDS.password: quartz
org.quartz.dataSource.myDS.maxConnections: 5
org.quartz.dataSource.myDS.validationQuery: select 0
# instance2.properties
实例2的配置文件,实例ID为 instance_two,其余配置与实例1完全相同:
#============================================================================
# 配置主调度器属性
#============================================================================
org.quartz.scheduler.instanceName: TestScheduler
org.quartz.scheduler.instanceId: instance_two
#============================================================================
# 配置线程池
#============================================================================
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 5
org.quartz.threadPool.threadPriority: 5
#============================================================================
# 配置JobStore
#============================================================================
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.dataSource=myDS
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true
#============================================================================
# 配置数据源
#============================================================================
org.quartz.dataSource.myDS.driver: org.postgresql.Driver
org.quartz.dataSource.myDS.URL: jdbc:postgresql://localhost:5432/quartz
org.quartz.dataSource.myDS.user: quartz
org.quartz.dataSource.myDS.password: quartz
org.quartz.dataSource.myDS.maxConnections: 5
org.quartz.dataSource.myDS.validationQuery: select 0
重要提示
两个实例的配置文件中,org.quartz.scheduler.instanceId 必须不同,但所有其他属性应保持一致。不要在时钟未同步的独立机器上运行集群,应使用时间同步服务(如NTP守护程序)来同步各机器时钟。
# 运行示例
# 前置准备
- 安装 PostgreSQL 数据库,创建名为
quartz的数据库和用户 - 执行 Quartz 发行包中提供的 SQL 表创建脚本(位于
docs/dbTables/目录),在数据库中创建 Quartz 所需的表 - 下载 PostgreSQL JDBC 驱动(可从 http://jdbc.postgresql.org 获取),将 jar 文件放入 Quartz 发行包的
lib文件夹中 - 根据实际环境修改
instance1.properties和instance2.properties中的数据库连接信息
# 启动步骤
Windows 用户:
- 如有必要,修改
instance1.bat和instance2.bat,设置JAVA_HOME和JDBC_CP - 运行
instance1.bat(可在启动时传入clearJobs参数清除数据库中已有的作业和触发器) - 第一个实例启动后,运行
instance2.bat(脚本会自动传入dontScheduleJobs参数,避免重复调度)
UNIX/Linux 用户:
- 如有必要,修改
instance1.sh和instance2.sh,设置JAVA_HOME和JDBC_CP - 执行
instance1.sh - 第一个实例启动后,执行
instance2.sh
故障转移测试
尝试在运行时终止其中一个集群实例,观察其余实例是否接管正在进行的作业。注意,在默认设置下,检测故障可能需要约15秒的时间。

微信公众号

QQ交流群
原创网站开发,偏差难以避免。
如若发现错误,诚心感谢反馈。
愿你倾心相念,愿你学有所成。
愿你朝华相顾,愿你前程似锦。
如若发现错误,诚心感谢反馈。
愿你倾心相念,愿你学有所成。
愿你朝华相顾,愿你前程似锦。